博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
tornado入门看这一篇足以
阅读量:4304 次
发布时间:2019-05-27

本文共 41085 字,大约阅读时间需要 136 分钟。

如果觉得这排版不好,建议去看

文章目录

1.简介

Tornado是一个python语言的web服务框架,它是基于社交聚合网站FriendFeed的实时信息服务开发而来的。

2007年由4名Google前软件工程师一起创办了FriendFeed,旨在使用户能够方便地跟踪好友在Facebook和Twitter等多个社交网站上的活动。结果两年后,Facebook宣布收购FriendFeed

Tornado使FriendFeed使用的可扩展的非阻塞Web服务器及其相关工具的开源版本,这个Web框架看起来有些像web.py或 Google的webapp,不过为了更加有效地利用非阻塞服务器环境,Tornado这个Web框架还包含了一些相关的有用工具和优化。

与其他web框架一些区别

  • 非阻塞式的服务器

速度相当快,使用了epoll非阻塞的方式,每秒可以处理数以千计的连接。

  • 单进程单线程异步IO的网络模型

特点

  • 轻量级Web框架
  • 异步非阻塞IO处理方式(epoll)

单进程单线程异步IO的网络模式, epoll的异步网络IO

  • 出色的抗负载能力
  • 不依赖多进程或多线程(协程机制)
  • WSGI全栈替代产品

WSGI把应用(Application)和服务器(Server)结合起来,Tornado既可以是WSGI应用也可以是WSGI服务。

既是WebServer也是WebFramework

  • 支持长连接
  • AysncIo
  • http异步客户端AsyncHttpClient

常用tornado参考doc

2.架构

可以先跳过架构这块介绍,到安装使用。(使用过了tornado,再来看整体架构就清晰容易多了)

2-1.分层

Tornado不仅仅是一个Web框架,它完整地实现了HTTP服务器和客户端,再此基础上提供了Web服务,它可分为四层:

  • Web框架:最上层,包括处理器、模板、数据库连接、认证、本地化等Web框架所需功能。
  • HTTP/HTTPS层:基于HTTP协议实现了HTTP服务器和客户端
  • TCP层:实现TCP服务器负责数据传输
  • Event层:最底层、处理IO事件

在这里插入图片描述

注意颜色分层

  • httpserver: 服务于web模块的一个简单的HTTP服务器的实现

Tornado的HTTPConnection类用来处理HTTP请求,包括读取HTTP请求头、读取POST传递的数据,调用用户自定义的处理方法,以及把响应数据写给客户端的socket。

  • iostream: 对非阻塞式的socket的封装以便于常见读写操作

为了在处理请求时实现对socket的异步读写,Tornado实现了IOStream类用来处理socket的异步读写。

  • ioloop 核心的I/O循环

Tornado为了实现高并发和高性能,使用了一个IOLoop事件循环来处理socket的读写事件,IOLoop事件循环是基于Linux的epoll模型,可以高效地响应网络事件,这是Tornado高效的基础保证。

2-2.模块

Tornado是一个轻量级框架,它的模块不多最重要的模块是web,web模块包含了Tornado大部分主要功能的Web框架,其他模块都是工具性质的,以便让Web模块更加有用。

Core Web Framework 核心Web框架

  • tornado.web 包括Web框架大部分主要功能,包括RequestHandler和Application类。
  • tornado.httpserver一个无阻塞HTTP服务器的实现
  • tornado.template模板系统
  • tornado.escape HTML、JSON、URLs等编码解码和字符串操作
  • tornado.locale国际化支持

Asynchronous Networking 异步网络底层模块

  • tornado.ioloop 核心IO循环
  • tornado.iostream对非阻塞的Socket的简单封装以方便常用读写操作
  • tornado.httpclient无阻塞的HTTP服务器实现
  • tornado.netutil网络应用的实现主要是TCPServer类

Integration With Other Services 系统集成服务

  • tornado.auth 使用OpenId和OAuth进行第三方登录
  • tornado.databaseMySQL服务端封装
  • tornado.platform.twisted在Tornado上运行Twisted实现的代码
  • tornado.websocket实现和浏览器的双向通信
  • tornado.wsgi其他Python网络框架或服务器的相互操作

Utilities 应用模块

  • tornado.autoload产生环境中自动检查代码更新
  • tornado.gen基于生成器的接口,使用该模块 保证代码异步运行。
  • tornado.httputil分析HTTP请求内容
  • tornado.options解析终端参数
  • tornado.process多进程实现的封装
  • tornado.stack_context异步环境中对回调函数上下文保存、异常处理
  • tornado.testing单元测试

2-3.请求流程

注意请求和返回的流程(颜色区别,以IO Loop为起点)

在这里插入图片描述
在这里插入图片描述

3.安装

注意版本Tornado和python的版本:

  • Tornado4.3 可以运行在Python2.6、2.7、3.2+
  • Tornado6.0需要Python3.5.2+

3-1.python环境安装

python环境可以分为本地环境、虚拟环境、远程环境

  • 本地环境

本机安装个python,然后配置下环境变量。

  • 虚拟环境

借用一些软件(比如:Anaconda、Virtualenv/Virtualenvwrapper),安装虚拟环境,可以达到隔离效果,多个python在一台机器上也不会冲突

  • 远程环境

非本机,一台远程机器上python环境

我使用的是Anaconda,如何使用和安装可以参照这篇blog。

至于virtualenv和本机安装环境,自行用搜索引擎解决下

3-2.开发工具

主流的python开发工具

  • pycharm
  • vscode
  • eclipse

我使用的是pycharm,如何安装也不多介绍了。里面有下载地址:

3-3.pip使用

pip如何使用,如何切换国内网,也都看这篇blog吧。

3-4.安装tornado

# 使用anaconda创建一个tornado虚拟环境conda create --name iworkh-tornado python=3.7# 查看有哪些环境conda info -e# 切换到tornado虚拟环境activate iworkh-tornado# pip安装tornadopip install tornado# 或使用condo安装tornadoconda install tornado

3-5.创建项目

pycharm创建项目

在这里插入图片描述

因为前异步使用anaconda创建一个tornado虚拟环境,所以就使用已存在的环境了。

当然如果前面没有自己创建虚拟环境,也可以使用pycharm里的new environment using XXX(选中虚拟化工具),来创建新的。

至此,tornado的开发环境基本ok了,如果有问题,可在留言区留言,或者QQ联系我。

4.tornado优势

4-1.web框架比较

提到python web框架,那么就想到了Django和Flask,那么他们有啥区别呢?

可以读下这几篇blog

总结以几点下:

  • 重量级

django比较重量级;flask和tornado都比较轻量级

  • 开发效率

django大而全的框架,效率高;

  • 并发能力

django和flask同步,能力查;tornado异步框架,性能相对好

  • 长连接

tornado适合用于开发长连接多的web应用。比如股票信息推送、网络聊天等。

  • 数据库与模板处理性能

Tornado与Flask旗鼓相当;django慢

  • 部署方面

tornado即使web框架,也是web服务;flask和tornado知识web框架,需要借助web服务器来部署

4-2.如何高并发

  • 异步非阻塞IO(基于epoll事件驱动)
  • 通过协程来实现高并发处理

4-3.建议

  • 不在tornado中使用大量的同步IO操作 (发挥不了tornado的特性)
  • 不要将耗时的操作都往线程池中扔 (这并不能提高tornado多大的性能)
  • tornado的线程和协程是两个概念
  • 编程时使用asnyc和await操作,而不建议coroutine协程装饰器和yield

coroutine是一个gen方式历史过滤方案,在python3.x后,已经使用了asnyc

5.tornado异步编程

我们都知道cpu的处理能力要远远大于IO的处理能力,而IO是阻塞操作,即io阻塞了,cpu即在休息,在虚度时间,在cpu资源,(浪费是可耻的,我们要利用最大化)。

当然不只本地磁盘IO,我们访问网络,常使用的requestsurllib的库都是同步的。

5-1.io模型

io模型主要分为同步/异步阻塞/非阻塞

同步和异步:是相对于调用方而言的

阻塞和非阻塞: 是相对于被调用方而言的

譬如:用户程序(用户空间,A)调用了文件系统内容(内核空间,B),假设B操作比较耗时要10秒钟。

对于A而言,是立刻得到B的返回结果呢,还是一直等着。等B处理完能够通知A,还是A要一直在等着?(同步/异步)

对于B而言,接收到A的请求了,处理完后,如何能够通知到A,不用限制着A,要一直等着?(阻塞/非阻塞)

阻塞

安装下requests工具表

pip install requests

代码

import requestsblogHtml = requests.get("https://iworkh.gitee.io/blog/").content.decode("utf8")print(blogHtml)

requests.get是阻塞,如果网不好或服务器响应很慢的话,那阻塞这回很耗时

非阻塞

import socket# AF_INET:服务器之间网络通信,SOCK_STREAM 流式socketclient = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 设置非阻塞client.setblocking(False)# 传个tupleaddress = "www.baidu.com"try:    client.connect(("www.baidu.com", 80))except BlockingIOError as e:    print("连接中...干些其他事")    passwhile True:    try:        client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format("/", address).encode("utf8"))        print("连接成功")        break    except OSError as e:        pass# 定义bytesdata = b""while 1:    try:        rec = client.recv(512)        if rec:            data += rec        else:            print(data.decode("utf8"))            break    except BlockingIOError as e:        pass

setblocking设置为非阻塞的,然后后面一直使用while循环来处理,直到正常返回结果。(比太关注死循环的写法,而要关注于非阻塞了)

5-2.io多路复用

IO多路复用:一个进程监控多个描述符fds,当描述符状态为read、write等状态时,通知程序进行相应的操作。

select、poll和epoll都是多路复用机制。它们本质上也是同步IO,它们在读写过程是阻塞的。

而异步IO是非阻塞,由内核将数据从内核空间拷贝到用户空间

像了解其原理德胡啊,强力建议去看篇blog

5-2-1.select

select函数监控3类文件描述符: readfds,writefds,exceptfds。

window和linux都支持,但是单进监控的文件描述符有限,linux一般为1024。

在这里插入图片描述

  • 没有数据时,进程A被放到等待队列,即cpu没有处理进程A

在这里插入图片描述

  • 当有数据来,socket会处于就绪状态,将进程A放到cpu队列中去工作
  • 但是线程A不知道哪些socket是就绪的,所以只能全部遍历一边,去找到就状态了。

5-2-3.poll

poll使用一个pollfd指针实现。pollfd监控even事件,虽们没有最大限制,但是太多也影响性能。需要遍历所有的描述符。

某时刻,文件描述符,处于就绪状态很少。但是却要遍历所有描述符,因此当文件描述符很多时,性能会下降。

5-2-4.epoll

epoll对select和poll增强。是在2.6内核中提出的。没有最大文件描述符的限制。

epoll使用一个文件描述符区管理多个描述符,将用户的文件描述符event存在内核时间表中,这样用户空间和内核只要copy一次。

由于唤醒的进程,不知道哪些是就绪状态,所以可以将就绪状态的socket存下以便查找,不用都所有都遍历一遍。

在这里插入图片描述

  • 使用了eventpoll来记录,监控进程和哪些socket是处于就绪状态

在这里插入图片描述

  • 当Socket接收到数据,中断程序一方面修改 Rdlist,另一方面唤醒 eventpoll 等待队列中的进程,进程A再次进入运行状态
  • 也因为 Rdlist 的存在,进程 A 可以知道哪些 Socket 发生了变化。

5-2-5.回调异步

上面异步代码,都是while和try来处理,这我们修改下使用select方式进行改造

import socketfrom selectors import DefaultSelector, EVENT_WRITE, EVENT_READselector = DefaultSelector()class CallBackRequestSite:    def get_url(self, url):        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        self.client.setblocking(False)        self.host = url        self.data = b""        try:            self.client.connect((self.host, 80))        except BlockingIOError as e:            print("连接中...干些其他事")            pass        selector.register(self.client.fileno(), EVENT_WRITE, self.connected)    def connected(self, key):        selector.unregister(key.fd)        self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format("/", self.host).encode("utf8"))        selector.register(self.client.fileno(), EVENT_READ, self.read_data)    def read_data(self, key):        recv_data = self.client.recv(1024)        if recv_data:            self.data += recv_data        else:            selector.unregister(key.fd)            print(self.data.decode("utf8"))def loop():    while 1:        # 根据系统是否支持,使用epoll还是select,优先epoll。默认阻塞,有活动连接就返回活动的连接列表        ready = selector.select()        for key, mask in ready:            callback = key.data            callback(key)if __name__ == '__main__':    demo = CallBackRequestSite()    demo.get_url("www.baidu.com")    loop()

上面方式是通过回调函数的方式实现的异步操作,虽然异步了,但是回调方式的缺点也很明显

回调方式的缺点

  • 回调太多,会导致调用链太深(回调地狱)
  • 不便于阅读和维护

5-3.协程

协程(Coroutines)是一种比线程更加轻量级的存在,正如一个进程可以拥有多个线程一样,一个线程可以拥有多个协程。

进程和线程是由系统控制,而协程是由程序自己控制的。好处是性能大幅度的提升,因为不会像线程切换那样消耗资源。

一个进程可以包含多个线程,一个线程也可以包含多个协程。一个线程的多个协程的运行是串行的。当一个协程运行时,其它协程必须挂起。

多核CPU,多个进程或一个进程内的多个线程是可以并行运行的,但一个线程内协程却绝对是串行的,无论CPU有多少个核。

如对进程、线程、协程不太清除的阅读这篇文件

在这里插入图片描述

5-3-1.yield生成器

协程的一个重要基础就是:程序运行一个协程能够暂停,去运行另外的协程。(如何暂停?才是关键)

yield就是可以满足程序(函数/方法)执行时,暂停。

def yield_method():    print("do start...")    yield 1    res2 = yield 2    print("do c...res2:{}".format(res2))    yield 3

这是最简单的函数中使用yield的例子,resp = yield_method()是一个generator生成器。

  • next(resp): 可以使得程序从一个yield到下一个yield的执行。(当到最后一个yield时,再执行next会报错)
  • send: 可以发送参数给之前yield,并执行到下一个yield
  • foreach:一般都使用foreach来处理

不懂的可以阅读下这篇文章

def yield_method():    print("do start...")    yield 1    print("do a...")    res1 = yield 2    print("do b...res1: {}".format(res1))    res2 = yield 3    print("do c...res2:{}".format(res2))    yield 4def call_hand():    resp = yield_method()    print(next(resp))  # 运行到 yield 1处,返回 1    print(resp.__next__())  # 从yield 1开始运行,到 yield 2处,返回 2    print(resp.send("got it"))  # 从yield 2开始运行,并把send值给yield 2的变量,到 yield 3处,返回 3    print(next(resp))  # 从yield 3开始运行,到 yield 4处,返回 4def call_for():    for item in yield_method():        print(item)if __name__ == '__main__':    print(yield_method())    print("*" * 30)    call_hand()    print("*" * 30)    call_for()

5-3-2.异步

在tornado中,定义协程代码有多种方式

  • @coroutine装饰器(装饰器又分多家的,tornado和python都有)(这不推荐使用了)
  • async方式 (推荐使用)

直接上代码,里面有注释,具体自己悟

import asynciofrom asyncio import coroutine as pycoroutinefrom time import sleep, timefrom tornado.gen import coroutine as tcoroutineasync def async_method():    await asyncio.sleep(2)    print("do a...")    await await_sleep_one_min()    print("do b...python的async和await")    await t_yield_sleep_one_min()    print("do c...tornado装饰器")    await py_yield_from_sleep_one_min()    print("do d...python装饰器")    await asyc_sleep_one_min()    print("do e...")# 不推荐使用, tornado装饰器方式@tcoroutinedef t_yield_sleep_one_min():    print("等了又等--1")    yield sleep(1)    print("等了又等--2")    yield sleep(1)    print("等了又等--3")    yield sleep(1)# 不推荐使用,python装饰器方式,python的旧语法@pycoroutinedef py_yield_from_sleep_one_min():    yield from asyncio.sleep(1)# 推荐使用(async与await组合,因为asyncio.sleep也是个异步代码,需要用await)async def await_sleep_one_min():    await asyncio.sleep(1)# 推荐使用(async单独使用,因为sleep不是个异步方法,不需要用await)async def asyc_sleep_one_min():    print('asyc_sleep_one_min----waiting')    sleep(1)if __name__ == '__main__':    start = time()    asyncio.run(async_method())    end = time()    print('cost time = ' + str(end - start))

简单总结下:

  • 协程定义可以用装饰器或者async修饰(推荐async),后面只说async形式了。
  • 协程A调用其他协程B(A和B函数都用async),协程A里调用协程B的方法加使用await关键字,当然普通函数(非协程)不需要加await的

5-3-3.执行

执行协程方法,也有多种:tornado的IOLoop和python的asyncio

python方式

方式一:一直run

import asyncioif __name__ == '__main__':    start = time()    # 一直run    asyncio.ensure_future(async_method())    asyncio.get_event_loop().run_forever()    end = time()    print('cost time = ' + str(end - start))

方式二:run完就结束

asyncio.run(async_method())

方式三:run完就结束

asyncio.get_event_loop().run_until_complete(async_method())

tornado方式

# 通过tornado的IOLoop来run (执行某个协程后停止事件循环)IOLoop.current().run_sync(async_method)

5-4.AsnycHttpClient

同步方式HTTPClient

from tornado import httpclientdef get_url(url):    http_client = httpclient.HTTPClient()    resp = http_client.fetch(url)    print(resp.body.decode("utf8"))if __name__ == '__main__':    web_site = "https://iworkh.gitee.io/blog/"    get_url(web_site)

异步方式AsnycHttpClient

import asynciofrom tornado import httpclientasync def async_get_url(url):    http_client = httpclient.AsyncHTTPClient()    resp = await http_client.fetch(url)    print(resp.body.decode("utf8"))if __name__ == '__main__':    web_site = "https://iworkh.gitee.io/blog/"    asyncio.run(async_get_url(web_site))

6.tornado web基础

tornado.web提供了一种带有异步功能并允许它扩展到大量开放连接的简单的web框架, 使其成为处理长连接(long polling) 的一种理想选择.

6-1.简单示例

import timefrom tornado import webfrom tornado.ioloop import IOLoopclass HelloWorldHandlerOne(web.RequestHandler):    async def get(self, *args, **kwargs):        # 别在handler中写同步的代码,会被阻塞的        time.sleep(5)        self.write("hello world---one")class HelloWorldHandlerTwo(web.RequestHandler):    def get(self, *args, **kwargs):        self.write("hello world---two")url_map = [    ("/test1", HelloWorldHandlerOne),    ("/test2", HelloWorldHandlerTwo)]if __name__ == '__main__':    app = web.Application(url_map, debug=True)    app.listen(8888)    IOLoop.current().start()

几点说明

  • handler类要继承web.RequestHandler,然后重写rest一些接口(get,post,put,delete等)
  • 启动类,使用了web.Application,传了参数url和handler对应关系。debug参数
  • 在handler的方法里不要调用同步的方法,这样会阻塞请求。

即便请了不同的接口(test1和test2),由于test1的handler中time.sleep(5),先请求test1,后请求test2,test2也会等test1响应完,才会响应test2(单线程的)。

debug为true

  • debug设置为true,修改了代码,不用重启服务
  • idea的Run启动:后台启动。如果关闭需要控制面板后台关闭
  • idea的Debug启动:修改内容,会自动停了。得再次debug启动
  • 命令行启动:后台启动。ctr+c可以退出

测试url

  • http://localhost:8888/test1
  • http://localhost:8888/test2

6-2.url匹配

url匹配主要是对于python的表达式的使用,如不了解可以先去了解下

from tornado import web, ioloopclass IndexHandler(web.RequestHandler):    async def get(self, *args, **kwargs):        self.write("首页")class UserHandler(web.RequestHandler):    async def get(self, name, *args, **kwargs):        self.write("hello world, {}".format(name))class ProductHandler(web.RequestHandler):    def get(self, name, count, *args, **kwargs):        self.write("购买{}个{}".format(count, name))class DivHandler(web.RequestHandler):    #  one,two这个要跟url里的(?P
\d+)/(?P
\d+)里的名称匹配 def get(self, one, two, *args, **kwargs): try: result = int(one) / int(two) self.write("{}/{}={}".format(one, two, result)) except ZeroDivisionError: # 发生错误,会跳转 self.redirect(self.reverse_url('error_page', 500)) passclass ErrorHandler(web.RequestHandler): def get(self, code, *args, **kwargs): self.write("发生错误:error_code: {}".format(code))url_map = [ ("/user/(\w+)/?", UserHandler), ("/product/(\w+)/(\d+)/?", ProductHandler), ("/calc/div/(?P
\d+)/(?P
\d+)/?", DivHandler), web.URLSpec("/index/?", IndexHandler, name="index"), web.URLSpec("/error/(?P
\d+)/?", ErrorHandler, name="error_page")]if __name__ == '__main__': app = web.Application(url_map, debug=True) app.listen(8888) print('started...') ioloop.IOLoop.current().start()

几点说明

  • /?表示0或1个/,即url最后可以有/也可以没有/
  • url定义,可以使用tuple简单实现,也可以使用web.URLSpec来创建对象,可以指定一些参数(url,handler,kwargs,name)
  • (?P<xxx>\d+),表示取一个组名,这个组名必须是唯一的,不重复的,没有特殊符号,然后跟参数里名称要一样
  • redirect重定向,reverse_url根据名称类获取url

测试url

  • http://localhost:8888/index/
    结果:

首页

  • http://localhost:8888/user/iworkh
    结果:

hello world, iworkh

  • http://localhost:8888/product/apple/3
    结果:

购买3个apple

  • http://localhost:8888/calc/div/4/2
    结果:

4/2=2.0

  • http://localhost:8888/calc/div/4/0
    结果:

发生错误:error_code: 500

  • http://localhost:8888/errpr/404
    结果:

发生错误:error_code: 404

6-3.动态传参

动态传参:将一些配置参数通过命令行、配置文件方式动态传给程序,而不是代码写死。增加灵活性。

官网关于options的介绍

动态传参主要使用的tornado.options某块,使用步骤:

  • 先定义哪些参数options.define()
  • 从命令行或文件获取参数值options.parse_command_line(),options.parse_config_file("/server.conf")
  • 使用参数options.xxx

6-3-1.命令方式

直接上代码

from tornado import options, web, ioloopoptions.define("port", default="8888", type=int, help="端口号")options.define("static_path", default="static", type=str, help="静态资源路径")class IndexHandler(web.RequestHandler):    async def get(self):        await self.finish('welcome to iworkh !!!')url_map = [    ("/?", IndexHandler)]setting = {
'debug': True}if __name__ == '__main__': # 命令行方式 options.parse_command_line() app = web.Application(url_map, **setting) app.listen(options.options.port) ioloop.IOLoop.current().start()

这注意: url_map是个list,而setting是个dic,去web.Application的初始化方法中就可以看到需要的参数类型

启动命令

python option_demo.py --port=8080

通过--port指定启动端口

测试url

  • http://localhost:8080/

6-3-2.文件方式

将上面启动main代码修改为以下:

if __name__ == '__main__':    # 配置文件方式    options.parse_config_file('config.ini')    app = web.Application(url_map, **setting)    app.listen(options.options.port)    ioloop.IOLoop.current().start()

配置文件内容

port = 8889

就加了端口号

测试url

  • http://localhost:8889/

6-4.handler

因为后面内容,请求的方式不仅仅是get,还有post等,所以使用Postman工具来请求

Postman很简单,自行下载使用下,这就不多介绍了

handler里内容和细节相当得多,具体可以参照官网

我们主要从以下几个方面入手

  • input (前台传来的参数,如何解析.)
  • process (RequestHandler常用的方法get,post,put,delete)
  • output (后台处理后,如何返回给前台)
  • 常用的handler(RequestHandler,ErrorHandler,FallbackHandler,RedirectHandler,StaticFileHandler)
  • 还有cookie和application,这部分看下官网

6-4-1.input

常用的函数

`get_argument`,`get_arguments`: 从post的form中解析参数,如果form中没有,那会从url中解析`get_query_argument`,`get_query_arguments`: 从url中解析参数`get_body_argument`,`get_body_arguments`: json格式数据解析`request``path_args``path_kwargs`

注意函数名有s和没有s的区别:

  • 有s表示接受的是一个数组,即使一个数据,也会转为数组形式.
  • 没有s的,但是传了多个值(比如:http://localshot:8888/user?name=lilei&&name=wangwu),最后一个会生效(name=wangwu)

结论:

方法 传参
get_query_argument(s) 参数在url中
get_argument(s) 参数在form中,没有头信息,form中没有还可以从url中获取
get_body_argument(s) 参数在form中,带有头信息
request.body 参数是json

演示代码

import jsonfrom tornado import web, ioloopclass UrlParamHandler(web.RequestHandler):    async def get(self):        name = self.get_query_argument("name")        age = self.get_query_argument("age")        self.write("name: {}, age: {}".format(name, age))        self.write('
') names = self.get_query_arguments("name") ages = self.get_query_arguments("age") self.write("names: {}, ages: {}".format(names, ages))class FormParamHandler(web.RequestHandler): async def post(self): name = self.get_argument("name") age = self.get_argument("age") self.write("name: {}, age: {}".format(name, age)) self.write('
') names = self.get_arguments("name") ages = self.get_arguments("age") self.write("names: {}, ages: {}".format(names, ages))class JsonWithFormHeadersParamHandler(web.RequestHandler): async def post(self): name = self.get_body_argument("name") age = self.get_body_argument("age") self.write("name: {}, age: {}".format(name, age))class JsonParamHandler(web.RequestHandler): async def post(self): parm = json.loads(self.request.body.decode("utf8")) name = parm["name"] age = parm["age"] self.write("name: {}, age: {}".format(name, age))url_handers = [ ("/url", UrlParamHandler), ("/form", FormParamHandler), ("/jsonwithformheaders", JsonWithFormHeadersParamHandler), ("/json", JsonParamHandler)]if __name__ == '__main__': app = web.Application(url_handers, debug=True) app.listen(8888) print("started...") ioloop.IOLoop.current().start()

测试url

测试工具Postman,当然也可以使用python的requests来请求

1. 参数在url中,get请求

代码请求接口方式测试

import requestsurl="http://localhost:8888/url?name=zhangsan&&age=20"resp = requests.request('GET', url).content.decode("utf8")print(resp)

postman或浏览器方式测试

  • http://localhost:8888/url?name=zhangsan&&age=20
    结果:

name: zhangsan, age: 20

names: [‘zhangsan’], ages: [‘20’]

  • http://localhost:8888/url?name=zhangsan&&age=20&&name=lisi&&age=28
    结果:

name: lisi, age: 28

names: [‘zhangsan’, ‘lisi’], ages: [‘20’, ‘28’]

由上面结果,可以看出函数名有s和没s的区别了,下面就其他方法就演示了,因为有s的,就不演示了跟这类似.

个人觉得,不用s,传多个值,为啥不直接传数组了.特别后面是json格式交互的时候,多个值的场景完全可以数组作为值来传递

2. 参数在form中,没有请求头,post请求

代码请求接口方式测试

import requestsurl = "http://localhost:8888/form"data = {
"name": "zhangsan", "age": 20}resp = requests.request('POST', url, data=data).content.decode("utf8")

postman方式测试

  • http://localhost:8888/form
    结果:

name: lisi, age: 28

names: [‘zhangsan’, ‘lisi’], ages: [‘20’, ‘28’]

在这里插入图片描述

3. 参数在from中,并含有请求头Content-Type:application/x-www-form-urlencoded,post请求

代码请求接口方式测试

import requestsurl = "http://localhost:8888/jsonwithformheaders"headers = {
"Content-Type": "application/x-www-form-urlencoded;"}data = {
"name": "zhangsan", "age": 20}resp = requests.request('POST', url, headers=headers, data=data).content.decode("utf8")print(resp)

postman方式测试

  • http://localhost:8888/jsonwithform
    在这里插入图片描述
    4. 参数是json,没有头,post请求

代码请求接口方式测试

import requestsurl = "http://localhost:8888/json"data = {
"name": "zhangsan", "age": 20}resp = requests.request('POST', url, json=data).content.decode("utf8")print(resp)

注意这这参数传的是json=data,而不是data=data

postman方式测试

  • http://localhost:8888/json

没有头,只能通过request.body来拿到参数,而且是bytes类型,需要decode后,json解析后得到dict

在这里插入图片描述

提示: 注意json值传递时候,有没header,取值方式不同.结论如下:

方法 传参
get_query_argument(s) 参数在url中
get_argument(s) 参数在form中,没有头信息
get_body_argument(s) 参数在form中,带有头信息
request.body 参数是json

6-4-2.process

注意: process里函数的执行顺序

initialize: 初始化,通过`URLSpec`可将参数传递到方法prepare: 在get/post等操作之前,会调用get/head/post/delete/patch/put/options: 一般业务逻辑处理on_finish: 在out操作之后,会调用

直接上代码

from tornado import web, ioloopclass ProcessDemoHandler(web.RequestHandler):    # initialize方法同步    def initialize(self, dbinfo):        print("initialize...")        self.dbinfo = dbinfo        print("数据库host:{}".format(self.dbinfo['db_host']))        pass    async def prepare(self):        print("prepare...")        pass    async def get(self):        print("get...")        self.write("success!!!")        pass    def on_finish(self):        print("finish...")        passinit_param = {
'dbinfo': {
'db_host': 'localhost', 'db_port': 3306, 'db_user': 'root', 'db_password': '123', }}url_handers = [ (r"/?", ProcessDemoHandler, init_param),]if __name__ == '__main__': app = web.Application(url_handers, debug=True) app.listen(8888) print("started...") ioloop.IOLoop.current().start()

需要关注以下几点:

1.顺序: initialize 👉 prepare 👉 get/post/put/delete/… 👉 on_finish

2.initialize和on_finish方法同步
3.如何将参数通过url_handers传给initialize方法

结果:

initialize...数据库host:localhostprepare...get...

6-4-3.output

set_status: 设置状态码set_headeradd_headerclear_headerwrite: 写数据,可以write多次,放缓存中,而不会中断当flush或者finish或者没消息断开时发送flush: 刷新数据到客户端finish: 写数据,写完断开了render/render_string: 模板方式渲染输出redirect: 重定向send_error/write_error: 发送错误render_embed_js/render_embed_css: 嵌入的js和cssrender_linked_js/render_linked_css: 链接的js和css
import asynciofrom tornado import web, ioloopclass OutputDemoHandler(web.RequestHandler):    async def get(self):        self.set_status(200)        self.write("error!!!")        self.write("warning!!!")        self.write("
") await self.flush() await asyncio.sleep(5) self.write("success!!!") await self.finish("done")url_handers = [ (r"/?", OutputDemoHandler),]if __name__ == '__main__': app = web.Application(url_handers, debug=True) app.listen(8888) print("started...") ioloop.IOLoop.current().start()

这里主要使用了write,flush以及finish操作,其他前面用过就不多介绍了.

没用过的render_xxx形式的函数,在模板中使用的。

结果:

error!!!warning!!!success!!!done

第一行先出来,应为flush了,过了5秒后,第二行数据出来

6-5.setting

setting里每项值类型都是dic类型

配置主要分以下几种类型

  • General settings
  • Authentication and security settings:
  • Template settings
  • Static file settings

这不会全部介绍,只介绍下常用的,在实际开发中,去官网查阅.

官网setting配置项

打开后,直接搜索关键字settings,就能定位到

常用配置

debug

是否开启debug模式

default_handler_class

当url没有配置到handler时,默认处理器.

static_path

静态资源路径

static_url_prefix

静态资源前缀

static_handler_class

静态资源handler(默认是tornado.web.StaticFileHandler)

静态资源代码演示

from tornado import web, ioloopclass SettingDemoHandler(web.RequestHandler):    async def get(self):        await self.finish("success")url_handers = [    (r"/?", SettingDemoHandler),]settings={
'debug': True, 'static_path': 'public/static', 'static_url_prefix': '/static/'}if __name__ == '__main__': app = web.Application(url_handers, **settings) app.listen(8888) print("started...") ioloop.IOLoop.current().start()

在相对路径下,创建public/static/images文件夹,并放一张图片进去

测试url

  • http://localhost:8888/static/images/moon.jpg

端口号后面的前缀url紧跟的路径,要跟static_url_prefix配置一致

6-6.模板

模板就不多介绍了,工作中我们一般开发都前后端分离,因此这就不过多纠缠了。(单为了文章的完整性,我还是写了个简单例子)

前端:react、vue等前端框架

后端:java、go、python等语言编写的web框架跟前端进行数据交互

有兴趣的可以看下官网:

主要包含以下几点

1.普通字符串模板,加载使用2.文件模板,加载使用3.常用的模板语法 {
{
data }},{
% set ...%},{
% raw %}4.导入python,调用函数,循环输出5.模板继承 {
% extends "xxx.html" %}6.UIModule使用 {
% module Template("foo.html", arg=42) %}

6-6-1.页面结构

基本结构

在这里插入图片描述

6-6-1.结果

结果

在这里插入图片描述

6-6-2.代码

代码主要分模板、UIMoudle、orderHandler、启动类

启动类和handler

from typing import Optionalfrom tornado import web, ioloopclass OrderHandler(web.RequestHandler):    def get(self):        data = {
'title': '书本列表', 'orderList': [ {
'id': 1, 'name': 'java', 'price': 80, 'count': 1, 'link_to': '查看'}, {
'id': 2, 'name': 'springboot', 'price': 100, 'count': 2, 'link_to': '查看'}, {
'id': 3, 'name': 'springcloud', 'price': 120, 'count': 1, 'link_to': '查看'}, ] } self.render("tp_order.html", **data)class FooterUIMoudle(web.UIModule): def render(self): return self.render_string("uimodules/tp_footer.html") def embedded_css(self) -> Optional[str]: css = ''' .content { background: gray} a {text-decoration-line: none} ''' return cssurl_handlers = [ (r"/order", OrderHandler)]settings = {
"debug": True, "template_path": 'public/templates', "static_path": 'public/static', "static_url_prefix": "/static/", "ui_modules": {
'FooterUIMoudle': FooterUIMoudle }}if __name__ == '__main__': app = web.Application(url_handlers, **settings) app.listen(8888) print("started successfully") ioloop.IOLoop.current().start()

settings里定义ui_modulestemplate_path,static_path

注意给模板传参数的类型,在目录中也需要跟其对于方式取出值

tp_base.html模板

相对启动类路径:public/templates/tp_base.html

    
商城
{% block custom_css %} {% end %}
{% module Template("uimodules/tp_header.html") %}
{% block order_html_block%}

订单

{% end %}
{% module FooterUIMoudle() %} {% block custom_js %} {% end %}

里面使用UIMoudle、继承等手段

tp_base.html模板

相对启动类路径:public/templates/tp_base.html

{% extends 'tp_base.html' %}{% block order_html_block %}
{
{ total_price = total_price + item["price"] * item["count"] }}
{% set total_price = 0 %} {% for item in orderList %}
{% end %}
{
{ title }}
id 名称 价格 数量 详情
{
{ item["id"] }}
{
{ item["name"] }}
{
{ item["price"] }}元
{
{ item["count"] }}
{% raw item["link_to"] %}
总价格: {
{ total_price }}
{% end %}

里面使用继承、循环、raw、赋值等手段

uimoudles下的tp_footer.html模板

相对启动类路径:public/templates/uimodules/tp_header.html

welcome to iworkh !!!

uimoudles下的tp_footer.html模板

相对启动类路径:public/templates/uimodules/tp_footer.html

7.mysql

python里PyMySQL是同步,所以在tornado我们得用异步的,这介绍下aiomysql,它是基于PyMySQL的。用法差不错,只不过实现了异步操作。

安装依赖

pip install aiomysql

7-1.简单示例

import asyncioimport aiomysqlasync def test_example(loop):    pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,                                      user='root', password='',                                      db='mysql', loop=loop)    async with pool.acquire() as conn:        async with conn.cursor() as cur:            await cur.execute("SELECT 42;")            print(cur.description)            (r,) = await cur.fetchone()            assert r == 42    pool.close()    await pool.wait_closed()loop = asyncio.get_event_loop()loop.run_until_complete(test_example(loop))

这是官网Github上给的示例,主要注意以下几点

  • async异步编写
  • 调用异步操作时,要使用await
  • async with的用法,异步上下文管理器,接受会将对应的资源关闭的(异步上下文管理器指的是在enter和exit方法处能够暂停执行的上下文管理器,__aenter____aexit__方法。)

有兴趣可以去了解下async withasync for,百度下随便挑一篇阅读下就明白了(基本都雷同的😊)

创建表

CREATE TABLE `tb_order` (  `id` int(10) NOT NULL AUTO_INCREMENT,  `name` varchar(255) NOT NULL,  `price` decimal(10,2) DEFAULT NULL,  `count` int(5) DEFAULT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;

插入一条数据

INSERT INTO `iworkh_tornado`.`tb_order`(`id`, `name`, `price`, `count`)VALUES (1, 'java', 80.00, 1);

7-1.查询数据

使用get请求来请求rest接口,参数方法可以放在url中,也可以方法在form中 (通过get_argument获取参数)

查询逻辑

import aiomysqlfrom tornado import web, ioloopclass OrderHandler(web.RequestHandler):    def initialize(self, db_info) -> None:        self.db_info = db_info    async def get(self):        id = self.get_argument("id", None)        if not id:            raise Exception("id can not None")        db_id = db_name = db_price = db_count = None        async with aiomysql.create_pool(host=self.db_info["db_host"], port=self.db_info["db_port"],                                        user=self.db_info["db_user"], password=self.db_info["db_password"],                                        db=self.db_info["db_name"], charset=self.db_info["db_charset"]) as pool:            async with pool.acquire() as conn:                async with conn.cursor() as cur:                    sql = "SELECT `id`, `name`, `price`, `count` FROM TB_ORDER WHERE id = {};".format(id)                    print(sql)                    await cur.execute(sql)                    result = await cur.fetchone()                    try:                        if (result):                            db_id, db_name, db_price, db_count = result                    except Exception:                        pass            self.write("success, {}, {}, {}, {}".format(db_id, db_name, db_price, db_count))initParam = {
"db_info": {
'db_host': 'localhost', 'db_name': 'iworkh_tornado', 'db_port': 3306, 'db_user': 'iworkh', 'db_password': 'iworkh123', 'db_charset': 'utf8' }}url_handlers = [ (r"/order/?", OrderHandler, initParam)]if __name__ == '__main__': app = web.Application(url_handlers, debug=True) app.listen(8888) print("started successfully") ioloop.IOLoop.current().start()

测试代码

import requestsurl = "http://localhost:8888/order"data={
'id':'1'}resp = requests.get(url, data=data).content.decode("utf8")print(resp)

7-2.插入数据

查询逻辑

import aiomysqlfrom tornado import web, ioloopclass OrderHandler(web.RequestHandler):    def initialize(self, db_info) -> None:        self.db_info = db_info    async def post(self):        db_name= self.get_argument("name")        db_price= self.get_argument("price")        db_count= self.get_argument("count")        async with aiomysql.create_pool(host=self.db_info["db_host"], port=self.db_info["db_port"],                                        user=self.db_info["db_user"], password=self.db_info["db_password"],                                        db=self.db_info["db_name"], charset=self.db_info["db_charset"]) as pool:            async with pool.acquire() as conn:                async with conn.cursor() as cur:                    sql = "INSERT INTO TB_ORDER (`name`, `price`, `count`) VALUES ('{}','{}','{}');"\                        .format(db_name, db_price, db_count)                    print(sql)                    await cur.execute(sql)                    await conn.commit()            self.write("save successfully")initParam = {
"db_info": {
'db_host': 'localhost', 'db_name': 'iworkh_tornado', 'db_port': 3306, 'db_user': 'iworkh', 'db_password': 'iworkh123', 'db_charset': 'utf8' }}url_handlers = [ (r"/order/?", OrderHandler, initParam)]if __name__ == '__main__': app = web.Application(url_handlers, debug=True) app.listen(8888) print("started successfully") ioloop.IOLoop.current().start()

测试代码

import requestsurl = "http://localhost:8888/order"data = {
'name':'springboot', 'price': 100.0, 'count':2}resp = requests.post(url, data=data).content.decode("utf8")print(resp)

7-3.更新数据

查询逻辑

import aiomysqlfrom tornado import web, ioloopclass OrderHandler(web.RequestHandler):    def initialize(self, db_info) -> None:        self.db_info = db_info    async def put(self):        db_id= self.get_argument("id")        db_name= self.get_argument("name")        db_price= self.get_argument("price")        db_count= self.get_argument("count")        async with aiomysql.create_pool(host=self.db_info["db_host"], port=self.db_info["db_port"],                                        user=self.db_info["db_user"], password=self.db_info["db_password"],                                        db=self.db_info["db_name"], charset=self.db_info["db_charset"]) as pool:            async with pool.acquire() as conn:                async with conn.cursor() as cur:                    sql = "UPDATE TB_ORDER SET `name` = '{}', `price` = '{}', `count` = '{}' WHERE `id` = {};"\                        .format(db_name, db_price, db_count, db_id)                                       print(sql)                    await cur.execute(sql)                    await conn.commit()            self.write("save successfully")initParam = {
"db_info": {
'db_host': 'localhost', 'db_name': 'iworkh_tornado', 'db_port': 3306, 'db_user': 'iworkh', 'db_password': 'iworkh123', 'db_charset': 'utf8' }}url_handlers = [ (r"/order/?", OrderHandler, initParam)]if __name__ == '__main__': app = web.Application(url_handlers, debug=True) app.listen(8888) print("started successfully") ioloop.IOLoop.current().start()

测试代码

import requestsurl = "http://localhost:8888/order"data = {
'id': 1, 'name':'java', 'price': 60.0, 'count':1}resp = requests.put(url, data=data).content.decode("utf8")print(resp)

通过上面的aiomysl操作,虽然实现了异步操作,但是我们可以发现一些问题

  • 线程池操作,每次一个请求来都创建了个线程池,然后关闭
  • 原生sql操作,拼sql很累
  • 查询结果,解析很麻烦
  • model对象和数据库表关联操作,进行查询、更新、插入操作(有hibernate、jpa、Django orm等开发经验着,应该明白ORM框架带来的好处)

8.orm框架peewee

8-1.orm好处

我们通知之前对aiomysql对数据库的操作,我们发现了一些问题,操作很麻烦。

那使用orm有啥好处呢?

  • 操作更加方便,不用跟原生sql交互太多,便于维护。(特别写sql少了,,'等一些低价错误,很难发现,浪费开发时间)
  • orm可以屏蔽调底层数据(不管mysql、postgresql、sqlite等,都通过orm来操作,来适配)
  • 还能防止sql注入

python领域常用的一些ORM框架

  • Django’s ORM
  • peewee
  • SQLAlchemy

Django’s ORM

优点:    易用,学习曲线短    和Django紧密集合,用Django时使用约定俗成的方法去操作数据库缺点:    QuerySet速度不给力,会逼我用Mysqldb来操作原生sql语句。    不好处理复杂的查询,强制开发者回到原生SQL

peewee

优点:    Django式的API,使其易用    轻量实现,很容易和任意web框架集成    aysnc-peewee异步操作缺点:    不支持自动化schema迁移    不能像Django那样,使线上的mysql表结构生成结构化的模型。    多对多查询写起来不直观

SQLAlchemy

优点:    巨牛逼的企业级API,使得代码有健壮性和适应性    灵活的设计,使得能轻松写复杂查询缺点:    工作单元概念不常见    重量级 API,导致长学习曲线

8-2.peewee同步操作

数据操作,主要就增删改查功能,下面分别演示如何使用。

工作中使用前,最好把官网浏览一遍,掌握一些常用的api和细节

先pip安装

pip install peewee

8-2-1.定义model

base_model.py

import datetimefrom peewee import MySQLDatabase, Model, DateTimeFielddb_info = {
'host': '127.0.0.1', 'user': 'iworkh', 'password': 'iworkh123', 'charset': 'utf8', 'port': 3306}db = MySQLDatabase('iworkh_tornado', **db_info)class BaseModel(Model): created_date = DateTimeField(default=datetime.datetime.now) class Meta: database = db

user_model.py

from peewee import CharField, DateField, IntegerField, BooleanFieldfrom lesson05.models.base_model import *class User(BaseModel):    username = CharField(column_name='username', unique=True, null=False, max_length=50, verbose_name="姓名")    birthday = DateField(column_name='birthday', null=True, default=None, verbose_name='生日')    phone = CharField(column_name='phone', max_length=11)    mail = CharField(column_name='mail', max_length=50, verbose_name='邮件')    gender = IntegerField(column_name='gender', null=False, default=1, verbose_name='姓别')    is_admin = BooleanField(column_name='is_admin', default=False, verbose_name='是否是管理员')    class Meta:        table_name = "T_USER"
from peewee import ForeignKeyField, CharField, IntegerFieldfrom lesson05.models.base_model import BaseModelfrom lesson05.models.user_model import Userclass Pet(BaseModel):    user = ForeignKeyField(User, backref='pets')    name = CharField(index=True, column_name='name', max_length=50, verbose_name='名字')    age = IntegerField(column_name='age', verbose_name='年龄')    type = CharField(column_name='type', max_length=50, verbose_name='类型')    color = CharField(column_name='color', max_length=50, verbose_name='颜色')    description = CharField(column_name='description', max_length=500, verbose_name='描述')    class Meta:        table_name = "T_PET"

8-2-2.初始化表

import 省略if __name__ == '__main__':    db.connect()    table_list = [User, Pet]    db.create_tables(table_list)    print("end...")

8-2-3.插入

插入数据,可以使用savecreate方法来操作

  • save: 对象操作
  • create: 是类操作,看create函数上有@classmethod修饰符
import datetimeimport 省略userList = [    {
'username': 'zhangsan', 'birthday': '1988-08-08', 'gender': 1, 'mail': 'zhangsan@qq.com', 'phone': '111', 'is_admin': False}, {
'username': 'lisi', 'birthday': datetime.date(1999, 8, 8), 'gender': 1, 'mail': 'lisi@qq.com', 'phone': '222', 'is_admin': False}, {
'username': 'wangwu', 'birthday': datetime.date(2025, 8, 8), 'gender': 0, 'mail': 'wangwu@qq.com', 'phone': '333', 'is_admin': False}, {
'username': 'admin', 'birthday': datetime.date(2000, 8, 8), 'gender': 1, 'mail': 'admin@qq.com', 'phone': '444', 'is_admin': True}]def save(): user = User() user.username = 'iworkh_save' user.birthday = datetime.date(1988, 8, 8) user.gender = 0 user.mail = "157162006@qq.com" user.phone = "888888888" user.is_admin = True # 使用对象调用方法 row = user.save() print("save的返回值是值:{}".format(row))def save_list(): for item in userList: user = User(**item) user.save()def create(): user_dic = {
'username': 'iworkh_create2', 'birthday': datetime.date(1989, 8, 8), 'gender': 1, 'mail': '157162006@qq.com', 'phone': '888888888', 'is_admin': False } # 使用类调用方法 row = User.create(**user_dic) print("create的返回值值:{}".format(row))if __name__ == '__main__': save() create() save_list() print("end...")

8-2-4.查询

查询比较重要,但是peewee的api使用,跟原生sql语法优点类似

sql: select * from user where username like %iworkh%

peewee写法:User.select().where(User.username.contains(‘iworkh’))

import 省略def select_all():    # modelSelect = User.select() # sql: select * from user    fields = [User.username, User.phone]    modelSelect = User.select(*fields)  # sql: select username, phone from user    # 这还没有立即执行,返回的是modelSelect对象,当for或者execute时才会执行    for user in modelSelect:        print("{} --- {} --- {}".format(user.username, user.phone, user.is_admin))def select_conditon():    # 根据id取记录,如果id不存在,那么转化时候会报错    user_res_get_by_id = User.get_by_id(1)    print("get_by_id 结果:{}".format(user_res_get_by_id.username))    user_res_get = User.get(User.id == 1)    print("user_res_get 结果:{}".format(user_res_get.username))    user_res_dic = User[1]    print("user_res_dic 结果:{}".format(user_res_dic.username))    print("*" * 50)    # select * from user where username like %iworkh%    modelSelect = User.select().where(User.username.contains('iworkh'))    for user in modelSelect:        print("{} --- {}".format(user.username, user.is_admin))    print("*" * 50)    # select * from user ordery by birthday desc    modelSelect_order = User.select().order_by(User.birthday.desc())    for user in modelSelect_order:        print("{} --- {}".format(user.username, user.birthday))    print("*" * 50)    # 按每页3的大小分页,取第2页数据(从1开始计数)    modelSelect_pagable = User.select().paginate(page=2, paginate_by=3)    for user in modelSelect_pagable:        print("{} --- {}".format(user.username, user.id))if __name__ == '__main__':    select_all()    select_conditon()

8-2-5.更新

更新很简单,使用的是save函数,当save里传id值时,则认为是更新;id没有值None时,则是插入操作

import 省略def upate():    # 使用save方法即可更新,只要传的值中还有id    user_res_get_by_id = User.get_by_id(1)    user_res_get_by_id.username = user_res_get_by_id.username + '_update'    # 从数据中取处记录,并修改值后更新数据库    user_res_get_by_id.save()if __name__ == '__main__':   upate()

8-2-6.删除

删除记录有两种操作:

  • 类操作,根据id删除
  • 对象操作,这个对象要id值,底层也是根据id来删除记录的
import 省略def delete_by_id():    # 类操作    User.delete_by_id(3)def delete_instance():    user = User()    user.id = 4    # 对象操作,recursive参数控制关联数据是否删除    user.delete_instance()if __name__ == '__main__':    delete_by_id()    delete_instance()    print("end...")

8-2-7.技巧

技巧

peewee里还有很其他知识点,除了阅读官网提供的doc文档外,还有一些地方值得我们开发者注意的,就是源码下面的test和example模块。

留个问题:

peewee里transaction如何使用?(如何去官网doc和源码下快速找到你需要的答案呢?)

8-3.peewee-async操作

tornado调用接口,那得用异步的,所以peewee的同步操作,在tornado中肯定玩不转的。

peewee-async的接口主要分两部分

  • 高级API (推荐使用)

主要通过manager来操作

  • 低级API (不推荐使用)

主要通过peewee_async来操作

安装

pip install --pre peewee-async

8-3-1.定义models

定义model还是跟peewee一样,不过db的方式使用了peewee_async.MySQLDatabase

import datetimeimport peewee_asyncfrom peewee import Model, DateTimeField, TextField, CharFielddb_info = {
'host': '127.0.0.1', 'user': 'iworkh', 'password': 'iworkh123', 'charset': 'utf8', 'port': 3306}db = peewee_async.MySQLDatabase('iworkh_tornado', **db_info)class BaseModel(Model): created_date = DateTimeField(default=datetime.datetime.now) class Meta: database = dbclass BlogModel(BaseModel): title = CharField(column_name='title', max_length=50) content = TextField(column_name='content') class Meta: table_name = 't_blog'

8-3-2.基本操作

通过高级API的方式,进行增删改查操作

import asyncioimport peewee_asyncimport 省略了models的引入objects = peewee_async.Manager(db)async def save():    blog = {
'title': 'tornado入门', 'content': '详细内容请看文章'} await objects.create(BlogModel, **blog) print("保存成功")async def get_all(): all_blogs = await objects.execute(BlogModel.select()) for blog in all_blogs: print("id: {}, 文章标题:{}".format(blog.id, blog.title))async def delete(): blog = BlogModel() blog.id = 1 await objects.delete(blog) print("删除成功")def create_table(): BlogModel.create_table(True)def drop_table(): BlogModel.drop_table(True)async def test(): create_table() await save() print("*"*50) await get_all() print("*"*50) await delete() print("*"*50) await get_all() drop_table()if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(test()) print("end")

9.参考

10.推荐

整理不打字易,觉得有帮助就点个赞吧。

  • 个人博客刚开不久(内容还不多),主要用来辅助手册,写些零碎的知识点

  • 注册下个人用户,就可以管理自己的链接、享用各类学习手册,主要用来写手册,分享常用工具。

转载地址:http://vzhws.baihongyu.com/

你可能感兴趣的文章
学习笔记_vnpy实战培训day04_作业
查看>>
OCO订单(委托)
查看>>
学习笔记_vnpy实战培训day06
查看>>
回测引擎代码分析流程图
查看>>
Excel 如何制作时间轴
查看>>
股票网格交易策略
查看>>
matplotlib绘图跳过时间段的处理方案
查看>>
vnpy学习_04回测评价指标的缺陷
查看>>
ubuntu终端一次多条命令方法和区别
查看>>
python之偏函数
查看>>
vnpy学习_06回测结果可视化改进
查看>>
读书笔记_量化交易如何建立自己的算法交易01
查看>>
设计模式03_工厂
查看>>
设计模式04_抽象工厂
查看>>
设计模式05_单例
查看>>
设计模式06_原型
查看>>
设计模式07_建造者
查看>>
设计模式08_适配器
查看>>
设计模式09_代理模式
查看>>
设计模式10_桥接
查看>>